Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add basic asyncio support #43944

Open
wants to merge 13 commits into
base: main
Choose a base branch
from

Conversation

dstandish
Copy link
Contributor

@dstandish dstandish commented Nov 12, 2024

This is meant to be sort of a hello world for asyncio support in airflow. It will be refined and extended in the future. E.g. probably we would add more config flexibility re the libraries, connect args etc. But it's good to start simple.

Anyway, ultimately, I think Airflow really needs to go in this direction: in the new REST API, in the new AIP-72 internal API server, in triggers, and ultimately, in the scheduler.

@tirkarthi
Copy link
Contributor

@dstandish
Copy link
Contributor Author

dstandish commented Nov 13, 2024

Is this related to AIP-70?

https://cwiki.apache.org/confluence/display/AIRFLOW/%5BWIP%5D+AIP-70+Migrating+to+asynchronous+programming

I think you prob know the answer. The AIP deals with asyncio, so yes strictly speaking related in that sense. But not "part of". That AIP has been in draft a long time not sure actively worked on. Anyway, I don't think what I'm doing here really requires an AIP.

@tirkarthi
Copy link
Contributor

There is a similar draft PR open as POC for it though not active. So just wanted to confirm. Thanks for the details.

#36504

Copy link
Member

@pierrejeambrun pierrejeambrun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really exciting.

Do we want an @provide_async_session as a follow up ? (I think Ash wanted to kill that decorator at some point, even if I find it kind of useful actually)

@ashb
Copy link
Member

ashb commented Nov 13, 2024

@pierrejeambrun A FastAPI dep to give an async session would be good, but I think anything in the models etc should be passed an explicit session.

@potiuk
Copy link
Member

potiuk commented Nov 13, 2024

NICE! Yeah. I really like to see it working with mysql compatibility and benchmarks 📦

(because of course MySQL is our beloved database)

@dstandish
Copy link
Contributor Author

Really exciting.

Do we want an @provide_async_session as a follow up ? (I think Ash wanted to kill that decorator at some point, even if I find it kind of useful actually)

i would say let's wait and see how / where / when we need it

@omkar-foss
Copy link
Collaborator

Lovely! Thank you for adding this, will try to start using this soon :)

dstandish and others added 10 commits November 13, 2024 19:14
This is meant to be sort of a hello world for asyncio support in airflow.  It will be refined and extended in the future.  But I think airflow ultimately really needs to go in this direction: in the new REST API, in the new AIP-72 internal API server, in triggers, and ultimately, in the scheduler.
@omkar-foss
Copy link
Collaborator

@dstandish in lowest direct dep tests, we may need to limit minimum version for aiosqlite to 0.5.0 instead of 0.2.0 being used currently, as it doesn't have DatabaseError, hence leading to this error.

More info on this slack thread.

session = create_async_session()
session.add(Log(event="hihi1234"))
await session.commit()
l = await session.scalar(select(Log).where(Log.event == "hihi1234").limit(1)) # noqa: E741
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can avoid this noqa by just change the var name :)

Comment on lines +66 to +67
l = await session.scalar(select(Log).where(Log.event == "hihi1234").limit(1)) # noqa: E741
assert l.event == "hihi1234"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
l = await session.scalar(select(Log).where(Log.event == "hihi1234").limit(1)) # noqa: E741
assert l.event == "hihi1234"
log_result = await session.scalar(select(Log).where(Log.event == "hihi1234").limit(1)) # noqa: E741
assert log_result.event == "hihi1234"

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants